我们都知道,RDD存在着依赖关系,这些依赖关系形成了有向无环图DAG,DAG通过DAGScheduler进行Stage的划分,并基于每个Stage生成了TaskSet,提交给TaskScheduler。那么这整个过程在源码中是如何体现的呢?
1.作业的提交
1 | // SparkContext.scala |
1 | // DAGScheduler.scala |
可以看到,SparkContext的runjob方法调用了DAGScheduler的runjob方法正式向集群提交任务,最终调用了submitJob方法。
1 | 1// DAGScheduler.scala |
这里向eventProcessLoop对象发送了JobSubmitted消息。
1 | 1// DAGScheduler.scala |
DAGSchedulerEventProcessLoop对接收到的消息进行处理,在doOnReceive方法中形成一个event loop。
接下来将调用submitStage()方法进行stage的划分。
2.stage的划分
1 | 1// DAGScheduler.scala |
在submitStage方法中判断Stage的父Stage有没有被提交,直到所有父Stage都被提交,只有等父Stage完成后才能调度子Stage。
1 | 1// DAGScheduler.scala |
getmissingParentStages()方法为核心方法。
这里我们要懂得这样一个逻辑:我们都知道,Stage是通过shuffle划分的,所以,每一Stage都是以shuffle开始的,若一个RDD是宽依赖,则必然说明该RDD的父RDD在另一个Stage中,若一个RDD是窄依赖,则该RDD所依赖的父RDD还在同一个Stage中,我们可以根据这个逻辑,找到该Stage的父Stage。